package com.gcloud.mesh.dcs.chain;

import com.gcloud.mesh.dcs.dao.SchedulerJobDao;
import com.gcloud.mesh.dcs.dao.SchedulerStepDao;
import com.gcloud.mesh.dcs.entity.SchedulerStepEntity;
import com.gcloud.mesh.dcs.enums.SchedulerJobStatus;
import com.gcloud.mesh.dcs.enums.SchedulerStepStatus;
import com.gcloud.mesh.dcs.service.MigrateScheduleResultHandler;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 迁移任务调度状态查询
 */

@Slf4j
@Component
public class MigrationStatusSchedule {

    @Autowired
    private SchedulerStepDao schedulerStepDao;

    @Autowired
    private SchedulerJobDao schedulerJobDao;

    @Autowired
    private FilterChain filterChain;

    @Autowired
    private RedisUtil redisUtil;

    @Autowired
    private MigrateScheduleResultHandler migrateScheduleResultHandler;

    @Scheduled(fixedDelay = 1000 * 10)
    public void doExec() {

        log.info("==============【每隔10秒】迁移任务调度状态查询==============");

        String sql = "select * from dcs_scheduler_steps t where " +
                " t.status = 1 and end_time is null and sync_exec = 0 " +
                " and now() < TIMESTAMPADD(MINUTE, 12*60, begin_time)" +
                " order by begin_time" +
                "";
        List<SchedulerStepEntity> stepEntityList = schedulerStepDao.findBySql(sql, SchedulerStepEntity.class);
        if(stepEntityList!=null && !stepEntityList.isEmpty()){
            for (SchedulerStepEntity schedulerStepEntity : stepEntityList) {
                try {
                    String schedulerJobId = schedulerStepEntity.getSchedulerJobId();
                    String processClass = schedulerStepEntity.getProcessClass();
                    String attach = schedulerStepEntity.getAttach();
                    Class<IMigrationStepChain> clazz = (Class<IMigrationStepChain>) Class.forName(processClass);
                    IMigrationStepChain stepChain = SpringContextUtils.getBean(clazz);
                    if(stepChain==null) {
                        //更新查询失败状态
                        schedulerStepDao.updateStepStatusById(schedulerStepEntity.getId(), SchedulerStepStatus.FAIL,null);
                        continue;
                    }
                    if(stepChain.checkStatus(schedulerStepEntity.getId())){

                        List<IMigrationStepChain> baseChains = filterChain.getChainMap().get(stepChain.chainType());
                        if(baseChains==null) {
                            schedulerStepDao.updateStepStatusById(schedulerStepEntity.getId(), SchedulerStepStatus.FAIL,null);
                            continue;
                        }
                        AtomicInteger position = new AtomicInteger();
                        Optional<IMigrationStepChain> currentChain = baseChains.stream().peek(c->position.incrementAndGet()).filter(c -> c.getClass().getName().equals(stepChain.getClass().getName())).findFirst();
                        if(currentChain.isPresent()){
                            int pos = position.get();
                            filterChain.finishPreStepStatus(schedulerJobId,stepChain.getExecResult());
                            if(pos>=baseChains.size()) {
                                //最后一步
                                schedulerJobDao.updateJobStatusById(schedulerJobId, SchedulerJobStatus.SUCCESS);
                                //zhangdp增加修改数据库的相关记录
                                migrateScheduleResultHandler.handleMigrate(schedulerJobId);
                                continue;
                            }
                            IMigrationStepChain nextStep = baseChains.get(pos);
                            Object o = redisUtil.get(MigrationConstant.ATTACH_KEY_EXEC + schedulerJobId);
                            if(o!=null) attach = o.toString();
                            nextStep.doExec(schedulerJobId,attach,filterChain,nextStep);
                        }
                    }
                }catch (Exception e){
                    schedulerStepDao.updateStepStatusById(schedulerStepEntity.getId(), SchedulerStepStatus.FAIL,e.getMessage());
                    schedulerJobDao.updateJobStatusById(schedulerStepEntity.getSchedulerJobId(), SchedulerJobStatus.FAIL);
                    log.error("迁移任务调度执行下一步操作失败",e);
                }
            }
        }






    }

}
